{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create SparkContext & SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "sc = SparkContext(master = 'local')\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder \\\n",
    "          .appName(\"Python Spark SQL basic example\") \\\n",
    "          .config(\"spark.some.config.option\", \"some-value\") \\\n",
    "          .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TF, IDF and TF-IDF"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "* TF is short for **Term Frequency**. It is simply the frequency of a term in a document. The higher the TF is for a specific term, the more important that term is to that document.\n",
    "\n",
    "* IDF is short for **Inverse Document Frequency**. It is the frequency of documents that contain a specific term. If a term exists in every single document, then the Document Frequency is the largest and is 1. And the Inverse Document Frequency will be the smallest. In the situation, this term is non-informative for classifying the documents.The IDF is a measure of the relevance of a term. The higher the IDF is, the more relavant the term is.\n",
    "\n",
    "* TF-IDF is the product of TF and IDF. A high TF-IDF is obtained when the The Term Frequency is high and the Document Frequency is low (IDF is high).\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Term Frequency, HashingTF and CountVectorizer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Pyspark has two functions to calculate term frequencies from documents: the **`HashingTF()`** and the **`CountVectorizer()`**. These two functions do two things:\n",
    "\n",
    "1. Indexing terms: converting words to numbers.\n",
    "2. Calculate term frequencies for each documents.\n",
    "\n",
    "The `HashingTF()` utilizes the Murmurhash 3 function to map a raw feature (a term) into an index (a number). Hashing is the process of transforming data of arbitrary size to size-fixed, usually shorter data. The term frequencies are calculated based on the generated indices. For the  HashingTF() method, the mapping process is very cheap. Because each term-to-index mapping is independent of other term-to-index mapping. The hashing function takes a unique input and gerenate a “unique result”. However, **hashing collision** may occur, which means different features (terms) may be hased to the same index.\n",
    "\n",
    "The **`CountVectorizer()`** indexes terms by descending order of term frequencies in the entire corpus, NOT the term frequencies in the document. After the indexing process, the term frequencies are calculated by documents."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create some data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------------------------------+\n",
      "|terms                                      |\n",
      "+-------------------------------------------+\n",
      "|[spark, spark, spark, is, awesome, awesome]|\n",
      "|[I, love, spark, very, very, much]         |\n",
      "|[everyone, should, use, spark]             |\n",
      "+-------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import pandas as pd\n",
    "pdf = pd.DataFrame({\n",
    "        'terms': [\n",
    "            ['spark', 'spark', 'spark', 'is', 'awesome', 'awesome'],\n",
    "            ['I', 'love', 'spark', 'very', 'very', 'much'],\n",
    "            ['everyone', 'should', 'use', 'spark']\n",
    "        ]\n",
    "    })\n",
    "df = spark.createDataFrame(pdf)\n",
    "df.show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## HashingTF\n",
    "\n",
    "The **numFeatures** paramter takes an integer, which should be larger than the total number of terms in the corpus. And it should be a power of two so that features are mapped evenly to columns."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import HashingTF\n",
    "from pyspark.ml import Pipeline\n",
    "\n",
    "hashtf = HashingTF(numFeatures=pow(2, 4), inputCol='terms', outputCol='features(numFeatures), [index], [term frequency]')\n",
    "stages = [hashtf]\n",
    "pipeline = Pipeline(stages=stages)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------------------------------+------------------------------------------------+\n",
      "|terms                                      |features(numFeatures), [index], [term frequency]|\n",
      "+-------------------------------------------+------------------------------------------------+\n",
      "|[spark, spark, spark, is, awesome, awesome]|(16,[1,15],[4.0,2.0])                           |\n",
      "|[I, love, spark, very, very, much]         |(16,[0,1,2,8,12],[1.0,1.0,1.0,2.0,1.0])         |\n",
      "|[everyone, should, use, spark]             |(16,[1,9,13],[2.0,1.0,1.0])                     |\n",
      "+-------------------------------------------+------------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pipeline.fit(df).transform(df).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You may note that the first document has three distinct terms, but only two term frequencies are obtained. This apparent discrepancy is due to a **hashing collision**: both `spark` and `is` are getting hashed to `1`. The term frequency for index `1` in the first document is `4.0` corresponding to the three counts of `spark` and the one count of `is`. The likelihood of a hashing collision can be reduced by increasing the `numFeatures` parameter passed to the `HashingTF` function (the default for example is $2^18 = 262,144$)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## CountVectorizer\n",
    "\n",
    "The **`CountVectorizer()`** function has three parameters to control which terms will be kept as features.\n",
    "\n",
    "* minTF: features that has term frequency less than minTF will be removed. If minTF=1minTF=1, then no features will be removed.\n",
    "* minDF: features that has document frequency less than minDF will be removed. If minDF=1minDF=1, then no features will be removed.\n",
    "* vocabSize: keep terms of the top vocabSize frequencies.\n",
    "\n",
    "In the example below, the `minTF=1.0,minDF=1.0minTF=1.0,minDF=1.0` and `vocabSize=20vocabSize=20`, which is larger than the total number of terms. Therefore, all features (terms) will be kept."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import CountVectorizer\n",
    "from pyspark.ml import Pipeline\n",
    "\n",
    "countvectorizer = CountVectorizer(minTF=1.0, minDF=1.0, vocabSize=20, \n",
    "                                  inputCol='terms', outputCol='features(vocabSize), [index], [term frequency]')\n",
    "stages = [countvectorizer]\n",
    "pipeline = Pipeline(stages=stages)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------------------------------+----------------------------------------------+\n",
      "|terms                                      |features(vocabSize), [index], [term frequency]|\n",
      "+-------------------------------------------+----------------------------------------------+\n",
      "|[spark, spark, spark, is, awesome, awesome]|(10,[0,1,3],[3.0,2.0,1.0])                    |\n",
      "|[I, love, spark, very, very, much]         |(10,[0,2,4,5,6],[1.0,2.0,1.0,1.0,1.0])        |\n",
      "|[everyone, should, use, spark]             |(10,[0,7,8,9],[1.0,1.0,1.0,1.0])              |\n",
      "+-------------------------------------------+----------------------------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pipeline.fit(df).transform(df).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, lets use the StringIndexer() to index the corpus and see if the results is consistant with the CountVectorizer() method.\n",
    "\n",
    "### `flatMap` documents so that each row has a single term."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+\n",
      "|   terms|\n",
      "+--------+\n",
      "|   spark|\n",
      "|   spark|\n",
      "|   spark|\n",
      "|      is|\n",
      "| awesome|\n",
      "| awesome|\n",
      "|       I|\n",
      "|    love|\n",
      "|   spark|\n",
      "|    very|\n",
      "|    very|\n",
      "|    much|\n",
      "|everyone|\n",
      "|  should|\n",
      "|     use|\n",
      "|   spark|\n",
      "+--------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.types import StringType\n",
    "df_vocab = df.select('terms').rdd.\\\n",
    "            flatMap(lambda x: x[0]).\\\n",
    "            toDF(schema=StringType()).toDF('terms')\n",
    "df_vocab.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Calculate term frequencies in the corpus"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+----------+\n",
      "|frequency|      term|\n",
      "+---------+----------+\n",
      "|        5|   [spark]|\n",
      "|        2|    [very]|\n",
      "|        2| [awesome]|\n",
      "|        1|      [is]|\n",
      "|        1|       [I]|\n",
      "|        1|    [love]|\n",
      "|        1|[everyone]|\n",
      "|        1|     [use]|\n",
      "|        1|    [much]|\n",
      "|        1|  [should]|\n",
      "+---------+----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "vocab_freq = df_vocab.rdd.countByValue()\n",
    "pdf = pd.DataFrame({\n",
    "        'term': list(vocab_freq.keys()),\n",
    "        'frequency': list(vocab_freq.values())\n",
    "    })\n",
    "pdf\n",
    "tf = spark.createDataFrame(pdf).orderBy('frequency', ascending=False)\n",
    "tf.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Apply `StringIndexer()` to df_vocab"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import StringIndexer\n",
    "stringindexer = StringIndexer(inputCol='terms', outputCol='StringIndexer(index)')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+--------------------+\n",
      "|   terms|StringIndexer(index)|\n",
      "+--------+--------------------+\n",
      "|   spark|                 0.0|\n",
      "| awesome|                 1.0|\n",
      "|    very|                 2.0|\n",
      "|      is|                 3.0|\n",
      "|everyone|                 4.0|\n",
      "|       I|                 5.0|\n",
      "|    love|                 6.0|\n",
      "|  should|                 7.0|\n",
      "|    much|                 8.0|\n",
      "|     use|                 9.0|\n",
      "+--------+--------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "stringindexer.fit(df_vocab).transform(df_vocab).\\\n",
    "    distinct().\\\n",
    "    orderBy('StringIndexer(index)').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The indexing result is consistant for the first three terms. The rest of terms have the same frequency which is 1. These terms can not be sorted by frequency. This might be the reason that their indices don’t match the results from the CountVectorizer() method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
